package cKafka

import (
	"errors"
	"fmt"
	"strings"
	"time"

	"github.com/IBM/sarama"
	"github.com/IBM/sarama/tools/tls"
	"github.com/gin-gonic/gin"

	"gitee.com/csingo/cHelper"
	"gitee.com/csingo/cLog"
)

type ProducerType string

const (
	PRODUCER_TYPE_SYNC  ProducerType = "sync"
	PRODUCER_TYPE_ASYNC ProducerType = "async"
)

type ProducerOption struct {
	ID        string
	Type      ProducerType
	Config    *sarama.Config
	Brokers   []string
	Topic     string
	Partition int32
}

func (i *ProducerOption) GetID() (clientId, producerId string) {
	identify := cHelper.Md5([]byte(i.ID))
	clientId = fmt.Sprintf("%s:%s", identify, strings.Join(i.Brokers, ","))
	producerId = fmt.Sprintf("%s:%s:%s:%s:%d", identify, strings.Join(i.Brokers, ","), i.Type, i.Topic, i.Partition)

	return
}

type ProducerMessage struct {
	Key     string
	Value   string
	Headers []sarama.RecordHeader
}

type producer struct {
	id        string
	typ       ProducerType
	borkers   []string
	topic     string
	partition int32

	config   *sarama.Config
	client   sarama.Client
	clientId string

	syncProducer  sarama.SyncProducer
	asyncProducer sarama.AsyncProducer

	deleted           bool
	asyncProcessClose bool
}

func (i *producer) ID() string {
	return i.id
}

// Close 关闭生产者
func (i *producer) Close(ctx *gin.Context) {
	i.asyncProcessClose = true

	switch i.typ {
	case PRODUCER_TYPE_SYNC:
		if err := i.syncProducer.Close(); err != nil {
			cLog.WithContext(ctx, map[string]interface{}{
				"source": "cKafka.producer.Close",
				"err":    err.Error(),
			}).Error("cKafka sync producer 关闭异常")
		}
	case PRODUCER_TYPE_ASYNC:
		if err := i.asyncProducer.Close(); err != nil {
			cLog.WithContext(ctx, map[string]interface{}{
				"source": "cKafka.producer.Close",
				"err":    err.Error(),
			}).Error("cKafka async producer 关闭异常")
		}
	}

	if !i.deleted {
		container.deleteOperator(i.clientId, i.id)
	}
}

func (i *producer) Delete() {
	i.deleted = true
}

// Produce 推送消息
func (i *producer) Produce(ctx *gin.Context, msgs []*ProducerMessage) error {
	var err error
	switch i.typ {
	case PRODUCER_TYPE_SYNC:
		// 同步推送
		messages := []*sarama.ProducerMessage{}
		for _, msg := range msgs {
			message := &sarama.ProducerMessage{
				Topic:     i.topic,
				Key:       sarama.StringEncoder(msg.Key),
				Value:     sarama.StringEncoder(msg.Value),
				Headers:   msg.Headers,
				Partition: i.partition,
			}
			messages = append(messages, message)
		}
		if len(messages) == 0 {
			break
		}
		err = i.syncProducer.SendMessages(messages)
	case PRODUCER_TYPE_ASYNC:
		// 异步推送
		for _, msg := range msgs {
			message := &sarama.ProducerMessage{
				Topic:     i.topic,
				Key:       sarama.StringEncoder(msg.Key),
				Value:     sarama.StringEncoder(msg.Value),
				Headers:   msg.Headers,
				Partition: i.partition,
			}
			i.asyncProducer.Input() <- message
		}
	}

	if err != nil {
		cLog.WithContext(ctx, map[string]interface{}{
			"source": "cKafka.producer.Produce",
			"msg":    msgs,
			"err":    err.Error(),
		}).Error("cKafka 推送消息失败")
	}

	return err
}

func NewProducer(option ProducerOption) (*producer, error) {
	var err error
	clientId, producerId := option.GetID()

	var client sarama.Client
	if kafkaclient := container.getClient(clientId); kafkaclient == nil {
		client, err = sarama.NewClient(option.Brokers, option.Config)
		if err != nil {
			return nil, err
		}

		// 生产者长链接注入容器
		container.saveClient(clientId, client)
	}

	var p *producer
	if op := container.getOperator(clientId, producerId); op != nil {
		p = op.(*producer)
	} else {
		p = &producer{
			id:                producerId,
			typ:               option.Type,
			borkers:           option.Brokers,
			topic:             option.Topic,
			partition:         option.Partition,
			config:            option.Config,
			client:            client,
			clientId:          clientId,
			syncProducer:      nil,
			asyncProducer:     nil,
			asyncProcessClose: false,
			deleted:           false,
		}

		switch option.Type {
		case PRODUCER_TYPE_SYNC:
			p.syncProducer, err = sarama.NewSyncProducerFromClient(p.client)
		case PRODUCER_TYPE_ASYNC:
			p.asyncProducer, err = sarama.NewAsyncProducerFromClient(p.client)
			go func(p *producer) {
				for {
					select {
					case e := <-p.asyncProducer.Errors():
						cLog.WithContext(nil, map[string]interface{}{
							"source": "cKafka.NewProducer",
							"err":    e.Error(),
						}).Error("cKafka 异步生产者生产消息异常")
					default:
						if p.asyncProcessClose {
							return
						}
					}
				}
			}(p)
		}

		container.saveOperator(clientId, producerId, p)
	}

	if err != nil {
		return nil, err
	}

	return p, err
}

// GetProducer 获取生产者
func GetProducer(name string, typ ProducerType) (instance *producer, err error) {
	if kafka_config.Connections == nil {
		err = errors.New(fmt.Sprintf("cKafka 配置不存在[%s]", name))
		return
	}

	if conf, ok := kafka_config.Connections[name]; !ok || conf == nil {
		err = errors.New(fmt.Sprintf("cKafka 配置不存在[%s]", name))
		return
	}

	if kafka_config.Connections[name].Topic == "" {
		err = errors.New(fmt.Sprintf("cKafka 配置异常[%s.Topics]", name))
		return
	}

	if kafka_config.Connections[name].Brokers == nil || len(kafka_config.Connections[name].Brokers) == 0 {
		err = errors.New(fmt.Sprintf("cKafka 配置异常[%s.Brokers]", name))
		return
	}

	if kafka_config.Connections[name].Producer == nil {
		err = errors.New(fmt.Sprintf("cKafka 配置不存在[%s.Producer]", name))
		return
	}

	var tlsCert, tlsKey, partitioner string
	var RequiredAck, tlsEnable, tlsInsecureSkipVerify bool
	var KeepAlive, DialTimeout, ReadTimeout, WriteTimeout int64
	var MaxMessageBytes, MaxOpenRequests int

	config := sarama.NewConfig()
	config.Producer.Return.Successes = true
	config.Producer.Return.Errors = true
	if kafka_config.Connections[name].Producer.MaxMessageBytes > 0 {
		MaxMessageBytes = kafka_config.Connections[name].Producer.MaxMessageBytes
		config.Producer.MaxMessageBytes = kafka_config.Connections[name].Producer.MaxMessageBytes
	}
	if kafka_config.Connections[name].Producer.RequiredAck {
		RequiredAck = kafka_config.Connections[name].Producer.RequiredAck
		config.Producer.RequiredAcks = sarama.WaitForAll
	}

	if kafka_config.Connections[name].Net != nil {
		if kafka_config.Connections[name].Net.Keepalive > 0 {
			KeepAlive = kafka_config.Connections[name].Net.Keepalive
			config.Net.KeepAlive = time.Duration(kafka_config.Connections[name].Net.Keepalive) * time.Second
		}
		if kafka_config.Connections[name].Net.MaxOpenRequests > 0 {
			MaxOpenRequests = kafka_config.Connections[name].Net.MaxOpenRequests
			config.Net.MaxOpenRequests = kafka_config.Connections[name].Net.MaxOpenRequests
		}
		if kafka_config.Connections[name].Net.DialTimeout > 0 {
			DialTimeout = kafka_config.Connections[name].Net.DialTimeout
			config.Net.DialTimeout = time.Duration(kafka_config.Connections[name].Net.DialTimeout) * time.Second
		}
		if kafka_config.Connections[name].Net.ReadTimeout > 0 {
			ReadTimeout = kafka_config.Connections[name].Net.ReadTimeout
			config.Net.ReadTimeout = time.Duration(kafka_config.Connections[name].Net.ReadTimeout) * time.Second
		}
		if kafka_config.Connections[name].Net.WriteTimeout > 0 {
			WriteTimeout = kafka_config.Connections[name].Net.WriteTimeout
			config.Net.WriteTimeout = time.Duration(kafka_config.Connections[name].Net.WriteTimeout) * time.Second
		}
		if kafka_config.Connections[name].Net.TLS != nil && kafka_config.Connections[name].Net.TLS.Enable == true {
			tlsCert = kafka_config.Connections[name].Net.TLS.Cert
			tlsKey = kafka_config.Connections[name].Net.TLS.Key
			tlsConfig, e := tls.NewConfig(kafka_config.Connections[name].Net.TLS.Cert, kafka_config.Connections[name].Net.TLS.Key)
			if e != nil {
				return nil, e
			}

			tlsEnable = kafka_config.Connections[name].Net.TLS.Enable
			tlsInsecureSkipVerify = kafka_config.Connections[name].Net.TLS.Skip

			config.Net.TLS.Enable = true
			config.Net.TLS.Config = tlsConfig
			config.Net.TLS.Config.InsecureSkipVerify = kafka_config.Connections[name].Net.TLS.Skip
		}
	}

	switch kafka_config.Connections[name].Producer.Partitioner {
	case PRODUCER_PARTITIONER_RANDOM:
		config.Producer.Partitioner = sarama.NewRandomPartitioner
	case PRODUCER_PARTITIONER_HASH:
		config.Producer.Partitioner = sarama.NewHashPartitioner
	case PRODUCER_PARTITIONER_MANUAL:
		if kafka_config.Connections[name].Producer.Partition == -1 {
			config.Producer.Partitioner = sarama.NewHashPartitioner
		} else {
			config.Producer.Partitioner = sarama.NewManualPartitioner
		}
	default:
		if kafka_config.Connections[name].Producer.Partition > 0 {
			config.Producer.Partitioner = sarama.NewManualPartitioner
		} else {
			config.Producer.Partitioner = sarama.NewHashPartitioner
		}
	}

	id := fmt.Sprintf("%s:%t:%t:%d:%t:%d:%d:%d:%d:%d:%t:%t:%s:%s:%s",
		typ,
		config.Producer.Return.Successes,
		config.Producer.Return.Errors,
		MaxMessageBytes,
		RequiredAck,
		KeepAlive,
		MaxOpenRequests,
		DialTimeout,
		ReadTimeout,
		WriteTimeout,
		tlsEnable,
		tlsInsecureSkipVerify,
		tlsCert,
		tlsKey,
		partitioner)

	return NewProducer(ProducerOption{
		ID:        id,
		Type:      typ,
		Config:    config,
		Brokers:   kafka_config.Connections[name].Brokers,
		Topic:     kafka_config.Connections[name].Topic,
		Partition: kafka_config.Connections[name].Producer.Partition,
	})
}

func GetSyncProducer(name string) (instance *producer, err error) {
	return GetProducer(name, PRODUCER_TYPE_SYNC)
}

func GetAsyncProducer(name string) (instance *producer, err error) {
	return GetProducer(name, PRODUCER_TYPE_ASYNC)
}

func DefaultProducer() (instance *producer, err error) {
	return GetProducer("default", PRODUCER_TYPE_SYNC)
}
